Hudi on Flink

适用于版本0.10.1。

1 指南

2 部署

使用Flink Sql Client测试

当前hudi版本适配flink-1.13.x和scala2.11

(2) 启动flink集群

启动带有Hadoop环境的独立Flink集群。

建议配置如下:

  • $FLINK_HOME/conf/flink-conf.yaml

    • 增加配置taskmanager.numberOfTaskSlots: 4
    • 根据需要增加其他全局配置,详见Flink配置
  • $FLINK_HOME/conf/workers

    增加4行locahost,以配置4个本地worker

启动集群:

1
2
3
4
5
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

# Start the Flink standalone cluster
./bin/start-cluster.sh

Flink-bundle.jar可以从hudi-source-dir/packaging/hudi-flink-bundle编译,或从Apache Official Repository下载。

启动CLI

1
2
3
4
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

./bin/sql-client.sh embedded -j .../hudi-flink-bundle_2.1?-*.*.*.jar shell

注意:

  • 建议使用hadoop2.9+版本,因为某些对象存储实现在之后版本
  • bundle中已集成了link-parquet和flink-avro
  • CLI只能按行执行SQL

3 数据插入

需要先创建表,再插入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
-- sets up the result mode to tableau to show the results directly in the CLI
set execution.result-mode=tableau;

CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = '${path}',
'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);

-- insert data using values
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

4 数据查询

快照查询

1
2
-- query from the Hudi table
select * from t1;

5 数据更新

同数据插入

1
2
3
-- this would update the record with key 'id1'
insert into t1 values
('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');

6 流式查询

查询从某一时间read.streaming.start-commit开始提交的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = '${path}',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true', -- this option enable the streaming read
'read.start-commit' = '20210316134557', -- specifies the start commit instant time
'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);

-- Then query the table in stream mode
select * from t1;

7 数据删除

流式查询过程中依旧可以应用数据变化。

8 其他

主题讨论和用例

参考资料